package drds.plus.executor.cursor.cursor.impl.join;

import drds.plus.common.properties.ConnectionParams;
import drds.plus.executor.ExecuteContext;
import drds.plus.executor.cursor.cursor.IIndexNestLoopCursor;
import drds.plus.executor.cursor.cursor.ISortingCursor;
import drds.plus.executor.cursor.cursor.impl.DuplicateValueRowDataLinkedList;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaData;
import drds.plus.executor.cursor.cursor_metadata.CursorMetaDataImpl;
import drds.plus.executor.record_codec.record.NamedRecord;
import drds.plus.executor.record_codec.record.Record;
import drds.plus.executor.row_values.ArrayRowValues;
import drds.plus.executor.row_values.RowValues;
import drds.plus.sql_process.abstract_syntax_tree.configuration.ColumnMetaData;
import drds.plus.sql_process.abstract_syntax_tree.execute_plan.query.Join;
import drds.tools.Threads;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * 批量到右边去取数据的index nested loop实现
 */
public class IndexNestedLoopMgetImpCursor extends IndexNestLoopCursor implements IIndexNestLoopCursor {
    ExecuteContext executeContext;
    Join join;
    boolean isLeftJoin = false;
    //
    /**
     * leftNode cursor ，会先取一批数据（sizeKeyLimination个），这是那一批数据的遍历器
     */
    Iterator<RowValues> leftRowDataIterator = null;


    /**
     * 左cursor join on columnNameList 的value的遍历器，这个值是从leftIterator里面，根据left join on
     * column ，取出来放到队列里的。
     */
    Iterator<Record> rightJoinRecordListIterator = null;
    protected CursorMetaData rightCursorMetaData = null;
    /**
     * 当前取出的kvPair
     */
    Map<Record, DuplicateValueRowDataLinkedList> rightRecordToDuplicateValueRowDataLinkedListMap;
    /**
     * 如果有重复，那么会放在这里
     */
    DuplicateValueRowDataLinkedList rightDuplicateValueRowDataLinkedList;


    /**
     * 一次匹配中，batch传递的数据个数
     */
    int sizeKeyLimination = 20;
    /**
     * 假定每个key都有25个不同的value
     */
    int sizeRetLimination = 5000;
    //
    /**
     * 开始执行的时间
     */
    private Long executeStartTime = System.currentTimeMillis();
    /**
     * 超时时间
     */
    private Long timeout = 0L;
    /**
     * 已迭代次数
     */
    private Long interationTimes = 0L;
    /**
     * 最大迭代次数
     */
    private Long maxInterationTimes = 0L;


    public IndexNestedLoopMgetImpCursor(ExecuteContext executeContext, Join join, ISortingCursor leftCursor, List leftJoinItemList, ISortingCursor rightCursor, List rightJoinItemList) throws RuntimeException {
        super(leftCursor, leftJoinItemList, rightCursor, rightJoinItemList);
        this.executeContext = executeContext;
        this.join = join;
        setLeftRightJoin(join);
        //
        maxInterationTimes = executeContext.getParametersManager().getLong(ConnectionParams.MAX_INDEX_NESTED_LOOP_ITERATION_TIMES);
        timeout = executeContext.getParametersManager().getLong(ConnectionParams.INDEX_NESTED_LOOP_TIME_OUT);
        sizeKeyLimination = executeContext.getParametersManager().getInt(ConnectionParams.COUNT_OF_KEY_TO_RIGHT_INDEX_NESTED_LOOP);
        sizeRetLimination = executeContext.getParametersManager().getInt(ConnectionParams.MAX_ROW_RETURN_FROM_RIGHT_INDEX_NESTED_LOOP);
    }


    protected RowValues forward(boolean forward) throws RuntimeException {
        //
        isLeftJoin = isLeftOutJoin() & !isRightOutJoin();
        while (true) {
            if (leftRowDataIterator == null) {// 以左值iterator，作为判断整个结果集合能不能next下去的关键判断。
                boolean hasMore = fetch(forward);
                if (!hasMore) {// 没有新结果集合，直接返回null
                    return null;
                }
            }
            RowValues rowData = join(leftRowDataIterator, rightJoinRecordListIterator, rightRecordToDuplicateValueRowDataLinkedListMap);
            if (rowData != null) {
                return rowData;
            } else {
                // 取尽，让这俩为空，这样下一次循环就可以从新去建心的iterator，或者没有iterator返回空了
                leftRowDataIterator = null;
                rightRecordToDuplicateValueRowDataLinkedListMap = null;
                rightJoinRecordListIterator = null;
            }
        }
    }

    private boolean fetch(boolean forward) throws RuntimeException {
        if (maxInterationTimes > 0) {
            if (++interationTimes > maxInterationTimes) {
                return false;
            }
        }
        if (timeout > 0) {
            if (System.currentTimeMillis() - this.executeStartTime >= timeout) {
                return false;
            }
        }
        //
        List<RowValues> leftRowDataList = new ArrayList<RowValues>(sizeKeyLimination);
        List<Record> rightJoinRecordList = new ArrayList<Record>(sizeKeyLimination);
        boolean hasMore = cache(leftRowDataList, rightJoinRecordList, forward);
        if (!hasMore) {
            return false;
        }
        // 如果使用index nest loop .那么右表一定是按主key进行查询的。
        rightRecordToDuplicateValueRowDataLinkedListMap = getRightRecordToDuplicateValueRowDataLinkedListMap(rightJoinRecordList);
        //
        leftRowDataIterator = leftRowDataList.iterator();
        rightJoinRecordListIterator = rightJoinRecordList.iterator();
        return true;
    }


    /**
     * 将left cursor 取出 sizeKeyLimination个。 放到缓存里
     */
    private boolean cache(List<RowValues> leftRowDataList, List<Record> rightJoinRecordList, boolean forward) throws RuntimeException {

        int currentSize = 0;
        boolean hasMore = false;
        while (getLeftCursorRowData(forward) != null) {
            // 有一个，就算has
            hasMore = true;
            Threads.checkInterrupted();
            setRightJoinRecord();
            // 上面的方法用来找到left JoinImpl on columnNameList ,然后放入key里面，这里就直接利用这个key,去右边查询
            leftRowDataList.add(leftRowData);
            rightJoinRecordList.add(rightJoinRecord);
            currentSize++;
            if (sizeKeyLimination <= currentSize) {
                return true;
            }
        }
        // 用后，清空，其他地方还可能会用到这两个类变量
        leftRowData = null;
        rightJoinRecord = null;
        // 耗尽
        return hasMore;
    }

    protected Map<Record, DuplicateValueRowDataLinkedList> getRightRecordToDuplicateValueRowDataLinkedListMap(List<Record> recordList) throws RuntimeException {

        return rightCursor.mgetRecordToDuplicateValueRowDataLinkedListMap(recordList, false, true);
    }

    /**
     * @param leftRowDataIterator                             左值的一个遍历队列便利器
     * @param rightJoinRecordListIterator                     左值中，用来做join on column的数据的队列便利器
     * @param rightRecordToDuplicateValueRowDataLinkedListMap 根据左面的数据id,从右面的结果集中取出的一组数据，这组数据内是可能有重复数据的。这个Map的key，是join
     *                                                        on column中要求的数据
     *                                                        value，是拥有这行数据的KVPair的集合（也就是拥有相同join on
     *                                                        column的数据的集合，是个链表)
     * @return 返回一个Join后的结果。
     * @throws RuntimeException
     */
    protected RowValues join(Iterator<RowValues> leftRowDataIterator, Iterator<Record> rightJoinRecordListIterator, Map<Record, DuplicateValueRowDataLinkedList> rightRecordToDuplicateValueRowDataLinkedListMap) throws RuntimeException {

        if (rightDuplicateValueRowDataLinkedList == null) {
            while (leftRowDataIterator.hasNext()) {
                leftRowData = leftRowDataIterator.next();
                if (!rightJoinRecordListIterator.hasNext()) {
                    throw new IllegalStateException("should not be here . leftJoinItemList is end, but leftNode kvPair is not");
                }
                Record rightJoinRecord = rightJoinRecord(rightJoinRecordListIterator);
                rightDuplicateValueRowDataLinkedList = rightRecordToDuplicateValueRowDataLinkedListMap.get(rightJoinRecord);
                if (rightDuplicateValueRowDataLinkedList != null) {
                    // 匹配，找到了
                    RowValues rightRowData = rightDuplicateValueRowDataLinkedList.rowData;
                    currentJoinRowData = join(leftRowData, rightRowData);
                    // 如果有重复，那么指针下移，让下次可以直接去选择。
                    rightDuplicateValueRowDataLinkedList = rightDuplicateValueRowDataLinkedList.next;
                    return currentJoinRowData;
                } else if (isLeftJoin) {
                    // 如果是left join
                    try {
                        List<ColumnMetaData> leftCursorColumnMetaDataList = this.leftCursor.getColumnMetaDataList();
                        List<ColumnMetaData> rightCursorColumnMetaDataList = this.rightCursor.getColumnMetaDataList();
                        if (this.rightCursorMetaData == null) {
                            // 都是按照返回列构建的，一致
                            this.build(leftCursorColumnMetaDataList, rightCursorColumnMetaDataList);//返回是两个一起的顺序
                            this.rightCursorMetaData = CursorMetaDataImpl.buildCursorMetaData(rightCursorColumnMetaDataList);
                        } else {
                            build(leftRowData.getParentCursorMetaData(), rightCursorMetaData);
                        }
                        // 建一个都为null的ArrayRowData
                        RowValues rightRowData = new ArrayRowValues(rightCursorMetaData, new Object[rightCursorMetaData.getColumnMetaDataList().size()]);
                        currentJoinRowData = join(leftRowData, rightRowData);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }

                    return currentJoinRowData;
                }
            }
            // 左值取尽
            return null;
        } else {
            /**
             * 如果 right节点的cache不为空，则证明某个右值还有数据相同的重复的Row. 所以左值不下移，右值取链表下一个。
             */
            RowValues rightRowData = rightDuplicateValueRowDataLinkedList.rowData;
            currentJoinRowData = join(leftRowData, rightRowData);
            // 如果有重复，那么指针下移，让下次可以直接去选择。
            rightDuplicateValueRowDataLinkedList = rightDuplicateValueRowDataLinkedList.next;
            return currentJoinRowData;
        }
    }

    private Record rightJoinRecord(Iterator<Record> recordIterator) {
        Record record = recordIterator.next();
        return new NamedRecord(record.getColumnNameToValueMap().keySet().iterator().next(), record);
    }

    public RowValues next() throws RuntimeException {

        RowValues rowData = super.next();
        return rowData;
    }

}
